home *** CD-ROM | disk | FTP | other *** search
- ## system-config-printer
-
- ## Copyright (C) 2006, 2007, 2008, 2009, 2010 Red Hat, Inc.
- ## Copyright (C) 2006 Florian Festi <ffesti@redhat.com>
- ## Copyright (C) 2007, 2008, 2009 Tim Waugh <twaugh@redhat.com>
-
- ## This program is free software; you can redistribute it and/or modify
- ## it under the terms of the GNU General Public License as published by
- ## the Free Software Foundation; either version 2 of the License, or
- ## (at your option) any later version.
-
- ## This program is distributed in the hope that it will be useful,
- ## but WITHOUT ANY WARRANTY; without even the implied warranty of
- ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- ## GNU General Public License for more details.
-
- ## You should have received a copy of the GNU General Public License
- ## along with this program; if not, write to the Free Software
- ## Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
- import cupshelpers
- from debug import *
- import errno
- import socket, time
- import gtk
- from timedops import TimedOperation
- import subprocess
- import threading
- import errno
- import cups
-
- try:
- import pysmb
- PYSMB_AVAILABLE=True
- except:
- PYSMB_AVAILABLE=False
- class pysmb:
- class AuthContext:
- pass
-
- def wordsep (line):
- words = []
- escaped = False
- quoted = False
- in_word = False
- word = ''
- n = len (line)
- for i in range (n):
- ch = line[i]
- if escaped:
- word += ch
- escaped = False
- continue
-
- if ch == '\\':
- in_word = True
- escaped = True
- continue
-
- if in_word:
- if quoted:
- if ch == '"':
- quoted = False
- else:
- word += ch
- elif ch.isspace ():
- words.append (word)
- word = ''
- in_word = False
- elif ch == '"':
- quoted = True
- else:
- word += ch
- else:
- if ch == '"':
- in_word = True
- quoted = True
- elif not ch.isspace ():
- in_word = True
- word += ch
-
- if word != '':
- words.append (word)
-
- return words
-
- ### should be ['network', 'foo bar', ' ofoo', '"', '2 3']
- ##print wordsep ('network "foo bar" \ ofoo "\\"" 2" "3')
-
- def open_socket(hostname, port):
- try:
- host, port = hostname.split(":", 1)
- except ValueError:
- host = hostname
-
- s = None
- try:
- ai = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
- socket.SOCK_STREAM)
- except socket.gaierror:
- ai = []
-
- for res in ai:
- af, socktype, proto, canonname, sa = res
- try:
- s = socket.socket(af, socktype, proto)
- s.settimeout(0.5)
- except socket.error, msg:
- s = None
- continue
- try:
- s.connect(sa)
- except socket.error, msg:
- s.close()
- s = None
- continue
- break
- return s
-
- class LpdServer:
- def __init__(self, hostname):
- self.hostname = hostname
- self.max_lpt_com = 8
-
- def probe_queue(self,name, result):
- s = open_socket(self.hostname, 515)
- if not s: return False
- print name
-
- try:
- s.send('\2%s\n' % name) # cmd send job to queue
- data = s.recv(1024) # receive status
- print repr(data)
- except socket.error, msg:
- print msg
- try:
- s.close ()
- except:
- pass
-
- return False
-
- if len(data)>0 and ord(data[0])==0:
- try:
- s.send('\1\n') # abort job again
- s.close ()
- except:
- pass
-
- result.append(name)
- return True
-
- try:
- s.close()
- except:
- pass
-
- return False
-
- def get_possible_queue_names (self):
- candidate = ["PASSTHRU", "ps", "lp", "PORT1", ""]
- for nr in range (self.max_lpt_com):
- candidate.extend (["LPT%d" % nr,
- "LPT%d_PASSTHRU" % nr,
- "COM%d" % nr,
- "COM%d_PASSTHRU" % nr])
- for nr in range (50):
- candidate.append ("pr%d" % nr)
-
- return candidate
-
- def probe(self):
- result = []
- for name in self.get_possible_queue_names ():
- while gtk.events_pending ():
- gtk.main_iteration ()
-
- found = self.probe_queue(name, result)
- if not found and name.startswith ("pr"):
- break
- time.sleep(0.1) # avoid DOS and following counter measures
-
- return result
-
- class BackgroundSmbAuthContext(pysmb.AuthContext):
- """An SMB AuthContext class that is only ever run from
- a non-GUI thread."""
-
- def __init__ (self, *args, **kwargs):
- self._gui_event = threading.Event ()
- pysmb.AuthContext.__init__ (self, *args, **kwargs)
-
- def _do_perform_authentication (self):
- gtk.gdk.threads_enter ()
- result = pysmb.AuthContext.perform_authentication (self)
- gtk.gdk.threads_leave ()
- self._do_perform_authentication_result = result
- self._gui_event.set ()
-
- def perform_authentication (self):
- if (self.passes == 0 or
- not self.has_failed or
- not self.auth_called or
- (self.auth_called and not self.tried_guest)):
- # Safe to call the base function. It won't try any UI stuff.
- return pysmb.AuthContext.perform_authentication (self)
-
- self._gui_event.clear ()
- gobject.timeout_add (1, self._do_perform_authentication)
- self._gui_event.wait ()
- return self._do_perform_authentication_result
-
- class PrinterFinder:
- def __init__ (self):
- self.quit = False
-
- def find (self, hostname, callback_fn):
- self.hostname = hostname
- self.callback_fn = callback_fn
- self.op = TimedOperation (self._do_find, callback=lambda x, y: None)
-
- def cancel (self):
- self.op.cancel ()
- self.quit = True
-
- def _do_find (self):
- self._cached_attributes = dict()
- for fn in [self._probe_jetdirect,
- self._probe_ipp,
- self._probe_snmp,
- self._probe_lpd,
- self._probe_hplip,
- self._probe_smb]:
- if self.quit:
- return
-
- try:
- fn ()
- except Exception, e:
- nonfatalException ()
-
- # Signal that we've finished.
- if not self.quit:
- self.callback_fn (None)
-
- def _new_device (self, uri, info, location = None):
- device_dict = { 'device-class': 'network',
- 'device-info': "%s" % info }
- if location:
- device_dict['device-location']=location
- device_dict.update (self._cached_attributes)
- new_device = cupshelpers.Device (uri, **device_dict)
- debugprint ("Device found: %s" % uri)
- self.callback_fn (new_device)
-
- def _probe_snmp (self):
- # Run the CUPS SNMP backend, pointing it at the host.
- null = file ("/dev/null", "r+")
- try:
- p = subprocess.Popen (args=["/usr/lib/cups/backend/snmp",
- self.hostname],
- close_fds=True,
- stdin=null,
- stdout=subprocess.PIPE,
- stderr=null)
- except OSError, e:
- if e == errno.ENOENT:
- return
-
- raise
-
- (stdout, stderr) = p.communicate ()
- if p.returncode != 0:
- return
-
- if self.quit:
- return
-
- for line in stdout.split ('\n'):
- words = wordsep (line)
- n = len (words)
- if n == 6:
- (device_class, uri, make_and_model,
- info, device_id, device_location) = words
- elif n == 5:
- (device_class, uri, make_and_model, info, device_id) = words
- elif n == 4:
- (device_class, uri, make_and_model, info) = words
- else:
- continue
-
- device_dict = { 'device-class': device_class,
- 'device-make-and-model': make_and_model,
- 'device-info': info }
- if n == 5:
- device_dict['device-id'] = device_id
- if n == 6:
- device_dict['device-location'] = device_location
-
- device = cupshelpers.Device (uri, **device_dict)
- self.callback_fn (device)
-
- # Cache the make and model for use by other search methods
- # that are not able to determine it.
- self._cached_attributes['device-make-and-model'] = make_and_model
-
- def _probe_lpd (self):
- lpd = LpdServer (self.hostname)
- for name in lpd.get_possible_queue_names ():
- if self.quit:
- return
-
- found = lpd.probe_queue (name, [])
- if found:
- uri = "lpd://%s/%s" % (self.hostname, name)
- self._new_device(uri, self.hostname)
-
- if not found and name.startswith ("pr"):
- break
-
- time.sleep(0.1) # avoid DOS and following counter measures
-
- def _probe_hplip (self):
- null = file ("/dev/null", "r+")
- try:
- p = subprocess.Popen (args=["hp-makeuri", "-c", self.hostname],
- close_fds=True,
- stdin=null,
- stdout=subprocess.PIPE,
- stderr=null)
- except OSError, e:
- if e == errno.ENOENT:
- return
-
- raise
-
- (stdout, stderr) = p.communicate ()
- if p.returncode != 0:
- return
-
- if self.quit:
- return
-
- uri = stdout.strip ()
- if uri.find (":") != -1:
- self._new_device(uri, uri)
-
- def _probe_smb (self):
- if not PYSMB_AVAILABLE:
- return
-
- smbc_auth = BackgroundSmbAuthContext ()
- debug = 0
- if get_debugging ():
- debug = 10
- ctx = pysmb.smbc.Context (debug=debug,
- auth_fn=smbc_auth.callback)
- entries = []
- uri = "smb://%s/" % self.hostname
- try:
- while smbc_auth.perform_authentication () > 0:
- if self.quit:
- return
-
- try:
- entries = ctx.opendir (uri).getdents ()
- except Exception, e:
- smbc_auth.failed (e)
- except RuntimeError, (e, s):
- if e not in [errno.ENOENT, errno.EACCES, errno.EPERM]:
- debugprint ("Runtime error: %s" % repr ((e, s)))
- except:
- nonfatalException ()
-
- if self.quit:
- return
-
- for entry in entries:
- if entry.smbc_type == pysmb.smbc.PRINTER_SHARE:
- uri = "smb://%s/%s" % (self.hostname, entry.name)
- info = "SMB (%s)" % self.hostname
- self._new_device(uri, info)
-
- def _probe_jetdirect (self):
- port = 9100 #jetdirect
- sock_address = (self.hostname, port)
- s = open_socket(self.hostname, port)
- if not s:
- debugprint ("%s:%d CLOSED" % sock_address)
- else:
- # port is open so assume its a JetDirect device
- debugprint ("%s:%d OPEN" % sock_address)
- uri = "socket://%s:%d" % sock_address
- info = "JetDirect (%s)" % self.hostname
- self._new_device(uri, info)
- s.close ()
-
- def _probe_ipp (self):
- try:
- ai = socket.getaddrinfo(self.hostname, 631, socket.AF_UNSPEC,
- socket.SOCK_STREAM)
- except socket.gaierror:
- debugprint ("Can't resolve %s" % self.hostname)
- return
- for res in ai:
- af, socktype, proto, canonname, sa = res
- if (af == socket.AF_INET and sa[0] == '127.0.0.1' or
- af == socket.AF_INET6 and sa[0] == '::1'):
- debugprint ("Do not probe local cups server")
- return
-
- try:
- c = cups.Connection (host = self.hostname)
- except RuntimeError:
- debugprint ("Can't connect to server/printer")
- return
-
- try:
- printers = c.getPrinters ()
- except cups.IPPError:
- debugprint ("%s is probably not a cups server but IPP printer" %
- self.hostname)
- uri = "ipp://%s:631/ipp" % (self.hostname)
- info = "IPP (%s)" % self.hostname
- self._new_device(uri, info)
- return
-
- for name, queue in printers.iteritems ():
- uri = queue['printer-uri-supported']
- info = queue['printer-info']
- location = queue['printer-location']
- self._new_device(uri, info, location)
-
-
-